Skip to main content

Overview

The platform includes two specialized schedulers for different data collection patterns:
  • snoozerScheduler: Urgency-based scheduling for live, high-frequency data (priceoverview, histogram, activity)
  • ClockworkScheduler: Fixed-interval scheduling for historical price data (pricehistory)
Both schedulers share a rate limiter and coordinate through the Orchestrator.

snoozerScheduler

Urgency-based scheduler for real-time market data with dynamic prioritization.

Class Definition

from src.snoozerScheduler import snoozerScheduler

scheduler = snoozerScheduler(
    live_items=items,
    rate_limiter=shared_limiter,
    config_path="config.yaml"
)

Constructor

live_items
List[dict] | None
default:"None"
Optional list of items to track. If None, loads from config.
rate_limiter
RateLimiter | None
default:"None"
Optional shared RateLimiter instance. If None, client creates its own.
config_path
str
default:"config.yaml"
Path to YAML configuration file (used if live_items is None)

Attributes

live_items
List[dict]
List of item configurations to track
rate_limiter
RateLimiter
Rate limiter instance for API call throttling
steam_client
SteamAPIClient | None
Steam API client (initialized in run())
data_wizard
SQLinserts | None
Database writer (initialized in run())

Methods

load_live_items()

Load all items from config that are NOT pricehistory.
def load_live_items(self) -> List[dict]:
Returns:
list
List[dict]
List of live item configurations (priceoverview, histogram, activity)

calculate_urgency()

Calculate urgency score for an item.
def calculate_urgency(self, item: dict) -> float:
item
dict
Item configuration with last_update and polling-interval-in-seconds
Returns:
float
float
Urgency score:
  • >= 1.0: Overdue, needs immediate execution
  • < 1.0: Not yet due
  • 0.0: In cooldown period
  • inf: Never updated (highest priority)
Formula:
urgency = (time since last update) / (target polling interval)
Example:
item = {
    'market_hash_name': 'AK-47 | Redline (Field-Tested)',
    'polling-interval-in-seconds': 30,
    'last_update': datetime.now() - timedelta(seconds=45)
}

urgency = scheduler.calculate_urgency(item)
# urgency = 45 / 30 = 1.5 (overdue by 50%)

calculate_min_sleep_duration()

Calculate minimum sleep time until ANY item becomes actionable.
def calculate_min_sleep_duration(self) -> float:
Returns:
float
float
Sleep duration in seconds until soonest item needs execution
Checks all items and returns the shortest time until any item:
  • Reaches urgency 1.0 (overdue), OR
  • Exits 429 cooldown (skip_until reached)

apply_exponential_backoff()

Apply exponential backoff for rate limit (429), server (5xx), or network errors.
def apply_exponential_backoff(self, item: dict, error_code: int) -> None:
item
dict
Item configuration that received the error
error_code
int
HTTP status code (429, 5xx) or 0 for network errors
Backoff Strategy:
  • 1st error: skip 1 polling interval
  • 2nd consecutive: skip 2 intervals
  • 3rd consecutive: skip 4 intervals
  • Capped at 8x the polling interval
Formula:
skip_multiplier = min(2^(consecutive_backoffs - 1), 8)
skip_seconds = polling_interval * skip_multiplier

execute_item()

Execute the API call for a specific item.
async def execute_item(self, item: dict) -> None:
item
dict
Item configuration to execute
Behavior:
  1. Check if item is in cooldown (skip if so)
  2. Execute appropriate API call based on apiid
  3. Store result to database
  4. Update last_update timestamp
  5. Reset backoff tracking on success
  6. Apply exponential backoff on errors

run()

Main scheduler loop using urgency-based algorithm.
async def run(self) -> None:
Algorithm:
  1. Calculate urgency for all items
  2. Execute ALL items with urgency >= 1.0
  3. If nothing was urgent, sleep until next item becomes urgent
  4. Repeat forever
Example:
import asyncio
from src.snoozerScheduler import snoozerScheduler

items = [
    {
        'market_hash_name': 'AK-47 | Redline (Field-Tested)',
        'apiid': 'priceoverview',
        'appid': 730,
        'polling-interval-in-seconds': 30
    },
    {
        'market_hash_name': 'AWP | Asiimov (Field-Tested)',
        'apiid': 'itemordershistogram',
        'appid': 730,
        'item_nameid': 176059193,
        'polling-interval-in-seconds': 60
    }
]

scheduler = snoozerScheduler(live_items=items)
await scheduler.run()

ClockworkScheduler

Fixed-interval scheduler for historical price data collection.

Class Definition

from src.clockworkScheduler import ClockworkScheduler

scheduler = ClockworkScheduler(
    items=history_items,
    rate_limiter=shared_limiter,
    config_path="config.yaml"
)

Constructor

items
List[dict] | None
default:"None"
Optional list of pricehistory items to track. If None, loads from config.
rate_limiter
RateLimiter | None
default:"None"
Optional shared RateLimiter instance. If None, client creates its own.
config_path
str
default:"config.yaml"
Path to YAML configuration file (used if items is None)

Attributes

history_items
List[dict]
List of pricehistory item configurations
rate_limiter
RateLimiter
Rate limiter instance for API call throttling
steam_client
SteamAPIClient | None
Steam API client (initialized in run())
data_wizard
SQLinserts | None
Database writer (initialized in run())

Methods

get_next_execution_time()

Calculate the next execution time (:30 past the next hour).
def get_next_execution_time(self) -> datetime:
Returns:
datetime
datetime
Next execution time (next hour at :30 UTC)
Example:
# Current time: 14:25 UTC
next_run = scheduler.get_next_execution_time()
# Returns: 14:30 UTC

# Current time: 14:35 UTC
next_run = scheduler.get_next_execution_time()
# Returns: 15:30 UTC

calculate_sleep_duration()

Calculate seconds to sleep until next execution.
def calculate_sleep_duration(self, next_execution: datetime) -> float:
next_execution
datetime
Target execution datetime
Returns:
float
float
Sleep duration in seconds

execute_history_items()

Execute pricehistory API calls for all configured items.
async def execute_history_items(self) -> None:
Runs all history items in sequence, respecting the rate limiter. Retries transient errors (429, 5xx, network) with exponential backoff.

run_initial_fetch()

Run pricehistory once immediately when scheduler starts.
async def run_initial_fetch(self) -> None:
This ensures data is available immediately, then switches to hourly schedule.

run()

Main clockwork loop.
async def run(self) -> None:
Algorithm:
  1. Run pricehistory immediately on startup
  2. Calculate next :30 past the hour
  3. Sleep until that time
  4. Execute all pricehistory items
  5. Repeat from step 2
Schedule: Executes at 30 minutes past every UTC hour:
  • 00:30, 01:30, 02:30, …, 23:30
This timing accounts for Steam’s ~20-30 minute lag in updating historical data. Example:
import asyncio
from src.clockworkScheduler import ClockworkScheduler

history_items = [
    {
        'market_hash_name': 'AK-47 | Redline (Field-Tested)',
        'apiid': 'pricehistory',
        'appid': 730,
        'polling-interval-in-seconds': 3600  # Ignored by ClockworkScheduler
    }
]

scheduler = ClockworkScheduler(items=history_items)
await scheduler.run()

Comparison

FeaturesnoozerSchedulerClockworkScheduler
Use CaseLive data (priceoverview, histogram, activity)Historical data (pricehistory)
AlgorithmUrgency-based (dynamic priority)Fixed-time (hourly at :30)
SchedulingCalculated per-item based on polling intervalFixed: :30 past every UTC hour
PriorityMost overdue item firstAll items equal
BackoffPer-item exponential backoffGlobal retry with backoff
Initial RunImmediate (all items fire at startup)Immediate (then hourly)
Sleep LogicUntil next item overdueUntil next :30 past hour

Error Handling

Both schedulers handle errors gracefully:

Transient Errors (429, 5xx, network)

  • snoozerScheduler: Exponential backoff per item (cooldown period)
  • ClockworkScheduler: Retry with fixed backoff (30s, 60s, 120s, 240s)

Authentication Errors (400, 401, 403)

  • snoozerScheduler: Log error, continue with other items
  • ClockworkScheduler: Retry with backoff (supports hot-swapping cookies)

Client Errors (other 4xx)

  • Log error, no retry (configuration issue)

Validation Errors

  • Log error, no retry (data structure mismatch)

Best Practices

  1. Share Rate Limiter: Always pass the same rate limiter instance to both schedulers via Orchestrator
  2. Configure Polling Intervals: Set intervals based on data freshness needs:
    • Price overview: 30-60s
    • Order histogram: 60-120s
    • Order activity: 30-60s
    • Price history: Ignored (fixed hourly)
  3. Monitor Urgency: Items with urgency consistently > 2.0 indicate underprovisioned rate limits
  4. Handle Backoff: Don’t manually override skip_until - let the scheduler manage cooldowns
  5. Use Orchestrator: Don’t instantiate schedulers directly in production - use Orchestrator for proper coordination